package com.atguigu.realtime.apps

import com.alibaba.fastjson.JSON
import com.atguigu.common.constants.{PrefixConstant, TopicConstant}
import com.atguigu.common.utils.JedisUtil
import com.atguigu.realtime.beans.StartLog
import com.atguigu.realtime.utils.{DStreamUtil, DateHandleUtil}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

/**
 * Created by Smexy on 2022/6/25
 *
 *    at least once + 幂等输出
 *        hbase:
 *        redis:
 */
object StartLogApp extends BaseApp {

  override var appName: String = "StartLogApp"
  override var groupId: String = "realtime220212"
  override var topic: String = TopicConstant.STARTUP_LOG
  override var batchDuration: Int = 10

  /*
      同一个批次中，将启动日志按照  (logDate,mid) 进行分组，在每个组中挑选 ts最小的那条
   */
  def removeDulicateLogInCommonBatch(rdd: RDD[ConsumerRecord[String, String]]): RDD[StartLog] = {

    val rdd1: RDD[((String, String), StartLog)] = rdd.map(record => {

      //只有从kafka消费的那些字段
      val startLog: StartLog = JSON.parseObject(record.value(), classOf[StartLog])

      startLog.start_time = DateHandleUtil.parseMillTsToDateTime(startLog.ts.toLong)
      startLog.start_date = DateHandleUtil.parseMillTsToDate(startLog.ts.toLong)

      //生成id
      startLog.id = startLog.start_time + "_" + startLog.mid

      ((startLog.start_date, startLog.mid), startLog)

    })

    //进行过滤
    // 仅仅是分组，那么就是groupByKey ,如果是分组聚合，是reduceByKey
    // groupByKey 不会map端聚合 ， reduceByKey 会map端聚合
    /*val rdd2: RDD[((String, String), Iterable[StartLog])] = rdd1.groupByKey()

    val rdd3: RDD[StartLog] = rdd2.flatMap {
      case ((date, mid), logs) => logs.toList.sortBy(_.ts).take(1)
    }
    rdd3*/

    val rdd2: RDD[((String, String), StartLog)] = rdd1.reduceByKey((log1, log2) => {
      if (log1.ts < log2.ts) {
        log1
      } else {
        log2
      }
    })
    rdd2.values
  }

  /*
      对比redis中记录的，今天已经向hbase写入过启动日志的设备号。

          要点一：  redis中 K-V设计。
                      K：   日期作为唯一标识
                      V：   要存储一天中记录过的多个 mid。  集合类型
                              Set(能去重)

          要点二：  如何去连接数据库
                      rdd.foreachPartition():  写入数据库，没有返回值
                      rdd.mapPartitions() : 读数据库，有返回值

   */
  def removeDulicateLogWithHistoryBatch(rdd1: RDD[StartLog]): RDD[StartLog]= {

    rdd1.mapPartitions(partition => {

      val jedis: Jedis = JedisUtil.getJedisClient

      //过滤
      val filteredPartition: Iterator[StartLog] = partition.filter(log => !jedis.sismember(PrefixConstant.dau_redis_Preffix + log.start_date, log.mid))

      jedis.close()

      filteredPartition

    })

  }

  def main(args: Array[String]): Unit = {

    context=new StreamingContext("local[*]",appName,Seconds(batchDuration))

    runApp{

      //②获取流
      val ds: InputDStream[ConsumerRecord[String, String]] = DStreamUtil.createDS(context, groupId, topic)

      //③运算 + 输出
      ds.foreachRDD(rdd => {
        
        if (!rdd.isEmpty()){

          val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          
          //同批次去重
          val rdd1: RDD[StartLog] = removeDulicateLogInCommonBatch(rdd)

          //历史批次去重
          val rdd2: RDD[StartLog] = removeDulicateLogWithHistoryBatch(rdd1)

          rdd2.cache()

          println("即将写出:"+rdd2.count())

          //过滤后的日志写入hbase
          // 导入spark中所有的静态方法
          import org.apache.phoenix.spark._

          /*
                   tableName: String,
                   cols: Seq[String]:   RDD[T]中的T类型的那些字段要和目标表的列一一对应

                   conf: Configuration = new Configuration,
                                       HBaseConfiguration.create() 是在 new Configuration的基础上，再进一步读取hbase-site.xml
                                          和hbase-default.xml
                   zkUrl: Option[String] = hbase所依赖的zk地址
           */
          rdd2.saveToPhoenix("REALTIME2022_STARTLOG",
            Seq("ID","OPEN_AD_MS","OS","CH","IS_NEW","MID","OPEN_AD_ID","VC","AR",
              "UID","ENTRY","OPEN_AD_SKIP_MS","MD","LOADING_TIME","BA","TS","START_DATE","START_TIME"),
            HBaseConfiguration.create(),
            Some("hadoop102:2181")
          )

          //将要记录的日志的设备号写入redis
          rdd2.foreachPartition(partition => {

            val jedisClient: Jedis = JedisUtil.getJedisClient

            partition.foreach(log => {

              jedisClient.sadd(PrefixConstant.dau_redis_Preffix+log.start_date , log.mid)

              // key只需要保留1天   redis有内存的自动回收机制，配置 ALLKEYS_LRU(最近最少使用)
              //jedisClient.expire(PrefixConstant.dau_redis_Preffix+log.start_date , PrefixConstant.start_log_max_delay_time)

            })

            jedisClient.close()

          })

          //提交偏移量
          ds.asInstanceOf[CanCommitOffsets].commitAsync(ranges)

        }
        
      })

    }

  }
}
